Skip to content

Refactor terminal manager onto Effect runtime#1525

Open
juliusmarminge wants to merge 11 commits intomainfrom
t3code/effect-native-terminal-manager-7
Open

Refactor terminal manager onto Effect runtime#1525
juliusmarminge wants to merge 11 commits intomainfrom
t3code/effect-native-terminal-manager-7

Conversation

@juliusmarminge
Copy link
Copy Markdown
Member

@juliusmarminge juliusmarminge commented Mar 29, 2026

Summary

  • Migrates the terminal manager to an Effect-based runtime and service layer.
  • Adds terminal shutdown escalation so stuck processes receive SIGKILL after the grace period.
  • Updates terminal session handling, persistence, and event streaming to support multi-terminal sessions and scoped cleanup.
  • Refreshes contracts and websocket handling to carry terminal IDs through terminal operations.

Testing

  • Not run (PR content only).
  • Existing coverage updated in apps/server/src/terminal/Layers/Manager.test.ts for lifecycle, persistence, subprocess activity, and shutdown behavior.
  • Existing coverage updated in apps/server/src/wsServer.test.ts for websocket protocol changes.

Note

High Risk
High risk because it rewrites terminal session lifecycle, persistence, and event delivery (including WebSocket wiring) with new concurrency and error-handling semantics. Regressions could break terminal IO, history storage, or cleanup/shutdown behavior across platforms.

Overview
Terminal session orchestration is rewritten to be Effect-native. TerminalManagerRuntime (EventEmitter + timers) is replaced with an Effect implementation built on SynchronizedRef state, per-thread Semaphore locking, scoped fibers, and PubSub-backed streamEvents.

Lifecycle, persistence, and polling behavior changes. History writes are debounced/coalesced via a new shared makeKeyedCoalescingWorker, subprocess polling moves to a fiber loop that only runs when sessions are active, and shutdown/close now performs SIGTERM→SIGKILL escalation via fibers with scoped cleanup.

Public API/contracts are updated. TerminalManagerShape removes subscribe()/dispose() in favor of streamEvents, introduces typed error variants (TerminalCwdError, TerminalHistoryError, etc.), WebSocket server now forwards terminal events by consuming the stream, and terminal IPC input types switch to schema Encoded forms; tests are migrated to @effect/vitest and updated for scoped shutdown and streaming semantics.

Written by Cursor Bugbot for commit cb8d16e. This will update automatically on new commits. Configure here.

Note

Refactor terminal manager from EventEmitter class to Effect runtime with PubSub streaming

  • Replaces the imperative TerminalManagerRuntime EventEmitter class in Manager.ts with makeTerminalManagerWithOptions, an Effect-based factory using SynchronizedRef, Semaphore, PubSub, and FileSystem.
  • Introduces a streamEvents getter on TerminalManagerShape (replacing subscribe()/dispose()); consumers in wsServer.ts now use Stream.runForEach over the stream instead of callback registration.
  • Adds KeyedCoalescingWorker in packages/shared to debounce and coalesce per-session history persistence writes via a background fiber.
  • Replaces the generic TerminalError with typed variants (TerminalCwdError, TerminalHistoryError, TerminalSessionLookupError, TerminalNotRunningError) in Manager.ts.
  • Rewrites the test suite in Manager.test.ts to use @effect/vitest with TestClock, scoped resources, and stream-based event assertions instead of imperative fs and event emitters.
  • Risk: event consumers must migrate from subscribe()/dispose() to streamEvents; schema validation on terminal operation inputs is removed.

Macroscope summarized cb8d16e.

- Move terminal lifecycle and PTY callbacks onto Effect-managed layers
- Update contracts and WebSocket paths for terminalId-aware operations
- Expand tests for streaming, shutdown, and session retention behavior
- Run kill escalation inline during shutdown cleanup
- Treat subprocess checks as optional and preserve state updates
- Add coverage for SIGKILL escalation after grace period
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 29, 2026

Important

Review skipped

Auto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 43286779-8e39-4b74-9dec-444b9cd2b59f

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch t3code/effect-native-terminal-manager-7

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions bot added vouch:trusted PR author is trusted by repo permissions or the VOUCHED list. size:XXL 1,000+ changed lines (additions + deletions). labels Mar 29, 2026
Copy link
Copy Markdown
Contributor

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Autofix Details

Bugbot Autofix prepared fixes for all 3 issues found in the latest run.

  • ✅ Fixed: Default test helper path ignores historyLineLimit parameter
    • Eliminated the dual-path approach in makeManager so it always uses makeTerminalManagerWithOptions with the provided historyLineLimit parameter instead of a fast path via TerminalManagerLive that silently defaulted to 5000.
  • ✅ Fixed: Falsy zero-value options prevent custom runtime creation
    • Replaced truthy checks with !== undefined checks for all numeric options (subprocessPollIntervalMs, processKillGraceMs, maxRetainedInactiveSessions) so explicit zero values are correctly propagated.
  • ✅ Fixed: Eviction test wait condition is always true
    • Changed the wait condition from the tautological ptyAdapter.processes.length === 2 to waiting for 2 'exited' events in the event stream, ensuring exit handlers and eviction complete before assertions.

Create PR

Or push these changes by commenting:

@cursor push 3b9d72eea8
Preview (3b9d72eea8)
diff --git a/apps/server/src/terminal/Layers/Manager.test.ts b/apps/server/src/terminal/Layers/Manager.test.ts
--- a/apps/server/src/terminal/Layers/Manager.test.ts
+++ b/apps/server/src/terminal/Layers/Manager.test.ts
@@ -12,17 +12,15 @@
 import { Effect, Encoding, Exit, Layer, ManagedRuntime, Ref, Scope, Stream } from "effect";
 import { afterEach, describe, expect, it } from "vitest";
 
-import { ServerConfig } from "../../config";
 import { TerminalManager } from "../Services/Manager";
 import {
-  PtyAdapter,
   type PtyAdapterShape,
   type PtyExitEvent,
   type PtyProcess,
   type PtySpawnInput,
   PtySpawnError,
 } from "../Services/PTY";
-import { makeTerminalManagerWithOptions, TerminalManagerLive } from "./Manager";
+import { makeTerminalManagerWithOptions } from "./Manager";
 
 class FakePtyProcess implements PtyProcess {
   readonly writes: string[] = [];
@@ -194,13 +192,29 @@
   const logsDir = path.join(baseDir, "userdata", "logs", "terminals");
   const ptyAdapter = options.ptyAdapter ?? new FakePtyAdapter();
 
-  const terminalLayer = TerminalManagerLive.pipe(
-    Layer.provideMerge(Layer.succeed(PtyAdapter, ptyAdapter)),
-    Layer.provideMerge(ServerConfig.layerTest(process.cwd(), baseDir)),
-    Layer.provideMerge(NodeServices.layer),
-  );
+  const layer = Layer.effect(
+    TerminalManager,
+    makeTerminalManagerWithOptions({
+      logsDir,
+      historyLineLimit,
+      ptyAdapter,
+      ...(options.shellResolver !== undefined ? { shellResolver: options.shellResolver } : {}),
+      ...(options.subprocessChecker !== undefined
+        ? { subprocessChecker: options.subprocessChecker }
+        : {}),
+      ...(options.subprocessPollIntervalMs !== undefined
+        ? { subprocessPollIntervalMs: options.subprocessPollIntervalMs }
+        : {}),
+      ...(options.processKillGraceMs !== undefined
+        ? { processKillGraceMs: options.processKillGraceMs }
+        : {}),
+      ...(options.maxRetainedInactiveSessions !== undefined
+        ? { maxRetainedInactiveSessions: options.maxRetainedInactiveSessions }
+        : {}),
+    }),
+  ).pipe(Layer.provideMerge(NodeServices.layer));
 
-  const runtime = ManagedRuntime.make(terminalLayer);
+  const runtime = ManagedRuntime.make(layer);
   const manager = await runtime.runPromise(Effect.service(TerminalManager));
   const eventsRef = await Effect.runPromise(Ref.make<TerminalEvent[]>([]));
   const eventScope = await Effect.runPromise(Scope.make("sequential"));
@@ -210,60 +224,6 @@
     ).pipe(Effect.forkIn(eventScope)),
   );
 
-  if (
-    historyLineLimit !== 5 ||
-    options.shellResolver ||
-    options.subprocessChecker ||
-    options.subprocessPollIntervalMs ||
-    options.processKillGraceMs ||
-    options.maxRetainedInactiveSessions
-  ) {
-    await runtime.dispose();
-
-    const customLayer = Layer.effect(
-      TerminalManager,
-      makeTerminalManagerWithOptions({
-        logsDir,
-        historyLineLimit,
-        ptyAdapter,
-        ...(options.shellResolver ? { shellResolver: options.shellResolver } : {}),
-        ...(options.subprocessChecker ? { subprocessChecker: options.subprocessChecker } : {}),
-        ...(options.subprocessPollIntervalMs
-          ? { subprocessPollIntervalMs: options.subprocessPollIntervalMs }
-          : {}),
-        ...(options.processKillGraceMs ? { processKillGraceMs: options.processKillGraceMs } : {}),
-        ...(options.maxRetainedInactiveSessions
-          ? { maxRetainedInactiveSessions: options.maxRetainedInactiveSessions }
-          : {}),
-      }),
-    ).pipe(Layer.provideMerge(NodeServices.layer));
-
-    const customRuntime = ManagedRuntime.make(customLayer);
-    const customManager = await customRuntime.runPromise(Effect.service(TerminalManager));
-    const customEventsRef = await Effect.runPromise(Ref.make<TerminalEvent[]>([]));
-    const customEventScope = await Effect.runPromise(Scope.make("sequential"));
-    await customRuntime.runPromise(
-      Stream.runForEach(customManager.streamEvents, (event) =>
-        Ref.update(customEventsRef, (events) => [...events, event]),
-      ).pipe(Effect.forkIn(customEventScope)),
-    );
-
-    return {
-      baseDir,
-      logsDir,
-      ptyAdapter,
-      runtime: customRuntime,
-      manager: customManager,
-      eventsRef: customEventsRef,
-      run: <A, E>(effect: Effect.Effect<A, E>) => customRuntime.runPromise(effect),
-      getEvents: () => Effect.runPromise(Ref.get(customEventsRef)),
-      dispose: async () => {
-        await Effect.runPromise(Scope.close(customEventScope, Exit.void));
-        await customRuntime.dispose();
-      },
-    };
-  }
-
   return {
     baseDir,
     logsDir,
@@ -636,7 +596,7 @@
   });
 
   it("evicts oldest inactive terminal sessions when retention limit is exceeded", async () => {
-    const { manager, ptyAdapter, run, logsDir } = await createManager(5, {
+    const { manager, ptyAdapter, run, logsDir, getEvents } = await createManager(5, {
       maxRetainedInactiveSessions: 1,
     });
 
@@ -656,7 +616,10 @@
     await new Promise((resolve) => setTimeout(resolve, 5));
     second.emitExit({ exitCode: 0, signal: 0 });
 
-    await waitFor(() => ptyAdapter.processes.length === 2);
+    await waitFor(async () => {
+      const events = await getEvents();
+      return events.filter((e) => e.type === "exited").length === 2;
+    });
 
     const reopenedSecond = await run(manager.open(openInput({ threadId: "thread-2" })));
     const reopenedFirst = await run(manager.open(openInput({ threadId: "thread-1" })));

- Treat omitted terminal IDs as the default session ID in manager operations
- Simplify terminal manager tests around retained session eviction
- Update terminal contract input types to use encoded schema types
Copy link
Copy Markdown
Contributor

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Persist worker exits without processing newly arrived data
    • Changed the cleanup block to return a boolean indicating whether new pendingHistory arrived during the race window, and continue the loop instead of returning when it did, ensuring the worker processes newly queued data.

Create PR

Or push these changes by commenting:

@cursor push 414a2e4f49
Preview (414a2e4f49)
diff --git a/apps/server/src/terminal/Layers/Manager.ts b/apps/server/src/terminal/Layers/Manager.ts
--- a/apps/server/src/terminal/Layers/Manager.ts
+++ b/apps/server/src/terminal/Layers/Manager.ts
@@ -837,15 +837,21 @@
         });
 
         if (!startState) {
-          yield* modifyManagerState((state) => {
+          const hasPending = yield* modifyManagerState((state) => {
             const existing = state.persistStates.get(sessionKey);
-            if (!existing || existing.pendingHistory !== null) {
-              return [undefined, state] as const;
+            if (!existing) {
+              return [false, state] as const;
             }
+            if (existing.pendingHistory !== null) {
+              return [true, state] as const;
+            }
             const persistStates = new Map(state.persistStates);
             persistStates.delete(sessionKey);
-            return [undefined, { ...state, persistStates }] as const;
+            return [false, { ...state, persistStates }] as const;
           });
+          if (hasPending) {
+            continue;
+          }
           return;
         }

- Add shared CoalescingDrainableWorker utility and tests
- Simplify terminal history persistence to drain per key
- Persist session history on close before removing session state
Copy link
Copy Markdown
Contributor

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Autofix Details

Bugbot Autofix prepared fixes for both issues found in the latest run.

  • ✅ Fixed: Worker error leaves key permanently stuck in activeKeys
    • Added Effect.onError handler around processKey to remove the key from activeKeys when the process effect fails, preventing drainKey/drain from hanging forever.
  • ✅ Fixed: Subprocess polling runs forever without active sessions
    • Added a hasRunningSessions gate and early-return in pollSubprocessActivity so subprocess checker invocations are skipped entirely when no running sessions exist.

Create PR

Or push these changes by commenting:

@cursor push 92c0e406e4
Preview (92c0e406e4)
diff --git a/apps/server/src/terminal/Layers/Manager.ts b/apps/server/src/terminal/Layers/Manager.ts
--- a/apps/server/src/terminal/Layers/Manager.ts
+++ b/apps/server/src/terminal/Layers/Manager.ts
@@ -1377,6 +1377,10 @@
           session.status === "running" && Number.isInteger(session.pid),
       );
 
+      if (runningSessions.length === 0) {
+        return;
+      }
+
       yield* Effect.forEach(
         runningSessions,
         (session) =>
@@ -1434,8 +1438,20 @@
       );
     });
 
+    const hasRunningSessions = readManagerState.pipe(
+      Effect.map((state) => [...state.sessions.values()].some((s) => s.status === "running")),
+    );
+
     yield* Effect.forever(
-      Effect.sleep(subprocessPollIntervalMs).pipe(Effect.flatMap(() => pollSubprocessActivity())),
+      hasRunningSessions.pipe(
+        Effect.flatMap((active) =>
+          active
+            ? pollSubprocessActivity().pipe(
+                Effect.flatMap(() => Effect.sleep(subprocessPollIntervalMs)),
+              )
+            : Effect.sleep(subprocessPollIntervalMs),
+        ),
+      ),
     ).pipe(Effect.forkIn(workerScope));
 
     yield* Effect.addFinalizer(() =>

diff --git a/packages/shared/src/CoalescingDrainableWorker.ts b/packages/shared/src/CoalescingDrainableWorker.ts
--- a/packages/shared/src/CoalescingDrainableWorker.ts
+++ b/packages/shared/src/CoalescingDrainableWorker.ts
@@ -77,7 +77,17 @@
           ] as const;
         }).pipe(Effect.tx),
       ),
-      Effect.flatMap((item) => (item === null ? Effect.void : processKey(item.key, item.value))),
+      Effect.flatMap((item) =>
+        item === null
+          ? Effect.void
+          : Effect.onError(processKey(item.key, item.value), () =>
+              TxRef.update(stateRef, (state) => {
+                const activeKeys = new Set(state.activeKeys);
+                activeKeys.delete(item.key);
+                return { ...state, activeKeys };
+              }),
+            ),
+      ),
       Effect.forever,
       Effect.forkScoped,
     );

- Skip subprocess polling until a terminal session is active
- Requeue a key after processor failure so draining continues
- Add regression coverage for both behaviors
Copy link
Copy Markdown
Contributor

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Autofix Details

Bugbot Autofix prepared fixes for both issues found in the latest run.

  • ✅ Fixed: Double disposal of runtime in shutdown test
    • Added a disposed guard flag to make the dispose() function idempotent, preventing the afterEach hook from double-closing the scope and runtime.
  • ✅ Fixed: Unused drain export in coalescing worker
    • Removed the unused drain property from both the CoalescingDrainableWorker interface and its implementation, as no consumer of this worker type uses it.

Create PR

Or push these changes by commenting:

@cursor push 60a613cd93
Preview (60a613cd93)
diff --git a/apps/server/src/terminal/Layers/Manager.test.ts b/apps/server/src/terminal/Layers/Manager.test.ts
--- a/apps/server/src/terminal/Layers/Manager.test.ts
+++ b/apps/server/src/terminal/Layers/Manager.test.ts
@@ -211,6 +211,7 @@
     ).pipe(Effect.forkIn(eventScope)),
   );
 
+  let disposed = false;
   return {
     baseDir,
     logsDir,
@@ -221,6 +222,8 @@
     run: <A, E>(effect: Effect.Effect<A, E>) => runtime.runPromise(effect),
     getEvents: () => Effect.runPromise(Ref.get(eventsRef)),
     dispose: async () => {
+      if (disposed) return;
+      disposed = true;
       await Effect.runPromise(Scope.close(eventScope, Exit.void));
       await runtime.dispose();
     },

diff --git a/packages/shared/src/CoalescingDrainableWorker.ts b/packages/shared/src/CoalescingDrainableWorker.ts
--- a/packages/shared/src/CoalescingDrainableWorker.ts
+++ b/packages/shared/src/CoalescingDrainableWorker.ts
@@ -13,7 +13,6 @@
 export interface CoalescingDrainableWorker<K, V> {
   readonly enqueue: (key: K, value: V) => Effect.Effect<void>;
   readonly drainKey: (key: K) => Effect.Effect<void>;
-  readonly drain: Effect.Effect<void>;
 }
 
 interface CoalescingWorkerState<K, V> {
@@ -126,16 +125,6 @@
         Effect.asVoid,
       );
 
-    const drain: CoalescingDrainableWorker<K, V>["drain"] = TxRef.get(stateRef).pipe(
-      Effect.tap((state) =>
-        state.latestByKey.size > 0 || state.queuedKeys.size > 0 || state.activeKeys.size > 0
-          ? Effect.txRetry
-          : Effect.void,
-      ),
-      Effect.asVoid,
-      Effect.tx,
-    );
-
     const drainKey: CoalescingDrainableWorker<K, V>["drainKey"] = (key) =>
       TxRef.get(stateRef).pipe(
         Effect.tap((state) =>
@@ -147,5 +136,5 @@
         Effect.tx,
       );
 
-    return { enqueue, drainKey, drain } satisfies CoalescingDrainableWorker<K, V>;
+    return { enqueue, drainKey } satisfies CoalescingDrainableWorker<K, V>;
   });

- Rename the shared coalescing worker API and exports
- Switch terminal persistence to the keyed worker
- Drop the unused global drain helper
Copy link
Copy Markdown
Contributor

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: All stat errors incorrectly mapped to "notFound" reason
    • Changed assertValidCwd to only map PlatformError with reason._tag "NotFound" to TerminalCwdError, while re-raising all other stat errors (permission-denied, IO errors, etc.) as defects via Effect.die, matching the original pre-refactor ENOENT-only handling.

Create PR

Or push these changes by commenting:

@cursor push 02e642acce
Preview (02e642acce)
diff --git a/apps/server/src/terminal/Layers/Manager.ts b/apps/server/src/terminal/Layers/Manager.ts
--- a/apps/server/src/terminal/Layers/Manager.ts
+++ b/apps/server/src/terminal/Layers/Manager.ts
@@ -977,13 +977,16 @@
 
     const assertValidCwd = Effect.fn("terminal.assertValidCwd")(function* (cwd: string) {
       const stats = yield* fileSystem.stat(cwd).pipe(
-        Effect.mapError(
-          (cause) =>
-            new TerminalCwdError({
-              cwd,
-              reason: "notFound",
-              cause,
-            }),
+        Effect.catch((error) =>
+          error.reason._tag === "NotFound"
+            ? Effect.fail(
+                new TerminalCwdError({
+                  cwd,
+                  reason: "notFound",
+                  cause: error,
+                }),
+              )
+            : Effect.die(error),
         ),
       );
       if (stats.type !== "Directory") {

- Preserve non-NotFound cwd stat errors as statFailed
- Add coverage for permission-denied cwd stat failures
- Skip repeated cleanup calls in the test harness
- Prevent double-closing the event scope and runtime
Copy link
Copy Markdown
Contributor

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is ON. A cloud agent has been kicked off to fix the reported issue. You can view the agent here.

- migrate test setup to Effect/Vitest layers
- add coverage for async timing and lifecycle cases
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XXL 1,000+ changed lines (additions + deletions). vouch:trusted PR author is trusted by repo permissions or the VOUCHED list.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant